RocketMQ Connect 實戰 5
Elasticsearch 來源 -> RocketMQ Connect -> Elasticsearch 儲存槽
準備工作
啟動 RocketMQ
- Linux/Unix/Mac
- 64 位元 JDK 1.8 以上;
- Maven 3.2.x 以上;
- 啟動 RocketMQ。可以使用 RocketMQ 4.x 或 RocketMQ 5.x 5.x 版本;
- 使用工具測試 RocketMQ 訊息傳送和接收。
在此,使用環境變數 NAMESRV_ADDR 來告知工具用戶端 RocketMQ 的 NameServer 位址為 localhost:9876。
#$ cd distribution/target/rocketmq-4.9.7/rocketmq-4.9.7
$ cd distribution/target/rocketmq-5.1.4/rocketmq-5.1.4
$ export NAMESRV_ADDR=localhost:9876
$ sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
SendResult [sendStatus=SEND_OK, msgId= ...
$ sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
ConsumeMessageThread_%d Receive New Messages: [MessageExt...
注意:RocketMQ 具備自動建立 Topic 和 Group 的功能,在發送或訂閱訊息時,若對應的 Topic 或 Group 不存在,RocketMQ 會自動建立,因此無需事先建立 Topic 和 Group。
以下為該內容的英文翻譯
建立連接器執行時間
複製儲存庫並建立 RocketMQ Connect 專案
git clone https://github.com/apache/rocketmq-connect.git
cd rocketmq-connect
export RMQ_CONNECT_HOME=`pwd`
mvn -Prelease-connect -Dmaven.test.skip=true clean install -U
建立 Elasticsearch 連接器外掛程式
建立 Elasticsearch RocketMQ 連接器外掛程式
cd $RMQ_CONNECT_HOME/connectors/rocketmq-connect-elasticsearch/
mvn clean package -Dmaven.test.skip=true
將編譯好的 Elasticsearch RocketMQ 連接器外掛程式 JAR 檔案複製到執行時間使用的外掛程式目錄中
mkdir -p /Users/YourUsername/rocketmqconnect/connector-plugins
cp target/rocketmq-connect-elasticsearch-1.0.0-jar-with-dependencies.jar /Users/YourUsername/rocketmqconnect/connector-plugins
在獨立模式下執行連接器工作
修改 connect-standalone.conf
檔案以設定 RocketMQ 連線位址和其他資訊。
cd $RMQ_CONNECT_HOME/distribution/target/rocketmq-connect-0.0.1-SNAPSHOT/rocketmq-connect-0.0.1-SNAPSHOT
vim conf/connect-standalone.conf
範例設定資訊如下
workerId=standalone-worker
storePathRootDir=/Users/YourUsername/rocketmqconnect/storeRoot
## Http port for user to access REST API
httpPort=8082
# Rocketmq namesrvAddr
namesrvAddr=localhost:9876
# RocketMQ acl
aclEnable=false
#accessKey=rocketmq
#secretKey=12345678
clusterName="DefaultCluster"
# Plugin path for loading Source/Sink Connectors
pluginPaths=/Users/YourUsername/rocketmqconnect/connector-plugins
在獨立模式下,RocketMQ Connect 會將同步檢查點資訊持續儲存在 storePathRootDir 指定的本機檔案目錄中。
storePathRootDir=/Users/YourUsername/rocketmqconnect/storeRoot
如果您想重設同步檢查點,請刪除持續性檔案
rm -rf /Users/YourUsername/rocketmqconnect/storeRoot/*
在獨立模式下啟動連接器工作
sh bin/connect-standalone.sh -c conf/connect-standalone.conf &
設定 Elasticsearch 服務
Elasticsearch 是一個開源搜尋和分析引擎。
我們將使用兩個獨立的 Elasticsearch Docker 執行個體作為我們的來源和目的地資料庫
docker pull docker.elastic.co/elasticsearch/elasticsearch:7.15.1
docker run --name es1 -p 9200:9200 -p 9300:9300 -e "discovery.type=single-node" -e "ES_JAVA_OPTS=-Xms1g -Xmx1g" \
-v /Users/YourUsername/rocketmqconnect/es/es1_data:/usr/share/elasticsearch/data \
-d docker.elastic.co/elasticsearch/elasticsearch:7.15.1
docker run --name es2 -p 9201:9200 -p 9301:9300 -e "discovery.type=single-node" -e "ES_JAVA_OPTS=-Xms1g -Xmx1g" \
-v /Users/YourUsername/rocketmqconnect/es/es2_data:/usr/share/elasticsearch/data \
-d docker.elastic.co/elasticsearch/elasticsearch:7.15.1
Docker 指令說明
--name es2
:指定容器名稱,例如es2
。-p 9201:9200 -p 9301:9300
:將 Elasticsearch 容器上的埠 9200 和 9300 對應至主機埠 9201 和 9301,以便可透過主機存取 Elasticsearch 服務。-e discovery.type=single-node
:設定 Elasticsearch 在單一節點上執行,而不會發現叢集中的其他節點,適合單一伺服器部署。-v /Users/YourUsername/rocketmqconnect/es/es2_data:/usr/share/elasticsearch/data
:將主機上的目錄掛載到容器內的/usr/share/elasticsearch/data
,以持續儲存 Elasticsearch 資料。
這會執行 Elasticsearch 的自訂設定執行個體,並將持續資料儲存在容器中,可透過主機電腦上的 9200 埠存取,這對於本機電腦上的開發或測試環境非常實用。
檢視 Elasticsearch 記錄
docker logs -f es1
docker logs -f es2
驗證 Elasticsearch 是否已成功啟動
# Check Elasticsearch instance 1
curl -XGET https://127.0.0.1:9200
# Check Elasticsearch instance 2
curl -XGET https://127.0.0.1:9201
如果連線成功且運作正確,將會產生包含 Elasticsearch 及其版本號碼資訊的 JSON 回應。
設定 Kibana 服務
Kibana 是一個開放原始碼的資料視覺化工具,讓使用者可以互動式地探索和了解儲存在 Elasticsearch 群集中的資料。它提供豐富的功能,例如圖表、圖形和儀表板。
為了方便起見,我們將在 Docker 中設定兩個獨立的 Kibana 執行個體,並使用下列指令將它們連結到我們先前建立的 Elasticsearch 容器
docker pull docker.elastic.co/kibana/kibana:7.15.1
docker run --name kibana1 --link es1:elasticsearch -p 5601:5601 -d docker.elastic.co/kibana/kibana:7.15.1
docker run --name kibana2 --link es2:elasticsearch -p 5602:5601 -d docker.elastic.co/kibana/kibana:7.15.1
Docker 指令說明
--name kibana2
:指定新容器的名稱,例如 kibana2--link es2:elasticsearch
:將容器連結到另一個名為 Elasticsearch 的執行個體(在本例中為「es2」)。這會讓 Kibana 和 Elasticsearch 能夠互相通訊。-p 5602:5601
:將 Kibana 的預設埠 (5601) 對應到主機電腦上的同一個埠,以便透過瀏覽器存取。-d
:在分離模式下執行 Docker 容器。
容器啟動後,您可以監控其記錄輸出
docker logs -f kibana1
docker logs -f kibana2
若要存取 Kibana 主控台頁面,只需在瀏覽器中輸入下列網址
- kibana1:https://127.0.0.1:5601
- kibana2:https://127.0.0.1:5602
如果載入正確,表示對應的 Kibana 執行個體已成功啟動。
將測試資料寫入來源 Elasticsearch
Kibana 的開發工具可以協助您在 Kibana 中直接與 Elasticsearch 互動和操作。您可以執行各種查詢和操作,分析和了解回傳的資料。請參閱文件 console-kibana。
大量寫入測試資料
透過瀏覽器存取 Kibana1 主控台,從左側選單中找到開發工具,並在頁面上輸入下列指令以寫入測試資料
POST /_bulk
{ "index" : { "_index" : "connect_es" } }
{ "id": "1", "field1": "value1", "field2": "value2" }
{ "index" : { "_index" : "connect_es" } }
{ "id": "2", "field1": "value3", "field2": "value4" }
注意:
- connect_es:資料的索引名稱。
- id/field1/field2:這些是欄位名稱,而 1、value1、value2 則代表欄位的數值。
注意:rocketmq-connect-elasticsearch
有個限制,它需要資料中有一個欄位可用於 >= 比較運算(字串或數字)。這個欄位將用於記錄同步檢查點。在上面的範例中,id
欄位是一個全球唯一、遞增的數字欄位。
查詢資料
若要查詢索引中的資料,請使用下列指令
GET /connect_es/_search
{
"size": 100
}
如果沒有可用的資料,回應將會是
{
"error" : {
...
"type" : "index_not_found_exception",
"reason" : "no such index [connect_es]",
"resource.type" : "index_or_alias",
"resource.id" : "connect_es",
"index_uuid" : "_na_",
"index" : "connect_es"
},
"status" : 404
}
如果有的可用的資料,回應將會是
{
...
"hits" : {
"total" : {
"value" : 2,
"relation" : "eq"
},
"max_score" : 1.0,
"hits" : [
{
"_index" : "connect_es",
"_type" : "_doc",
"_id" : "_dx49osBb46Z9cN4hYCg",
"_score" : 1.0,
"_source" : {
"id" : "1",
"field1" : "value1",
"field2" : "value2"
}
},
{
"_index" : "connect_es",
"_type" : "_doc",
"_id" : "_tx49osBb46Z9cN4hYCg",
"_score" : 1.0,
"_source" : {
"id" : "2",
"field1" : "value3",
"field2" : "value4"
}
}
]
}
}
刪除資料
如果您需要刪除索引中的資料,由於重複測試或其他原因,您可以使用下列指令
DELETE /connect_es
啟動連接器
啟動 Elasticsearch 來源連接器
執行下列指令以啟動 ES 來源連接器。連接器將連接到 Elasticsearch 並從 connect_es 索引中讀取文件資料。它將解析 Elasticsearch 文件資料並將其封裝到一個通用 ConnectRecord 物件中,該物件將被傳送至 RocketMQ 主題供 Sink 連接器使用。
curl -X POST -H "Content-Type: application/json" http://127.0.0.1:8082/connectors/elasticsearchSourceConnector -d '{
"connector.class":"org.apache.rocketmq.connect.elasticsearch.connector.ElasticsearchSourceConnector",
"elasticsearchHost":"localhost",
"elasticsearchPort":9200,
"index":{
"connect_es": {
"primaryShards":1,
"id":1
}
},
"max.tasks":2,
"connect.topicname":"ConnectEsTopic",
"value.converter":"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter",
"key.converter":"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter"
}'
注意:啟動指令指定來源 ES 應同步 connect_es 索引,而索引中的遞增欄位是 id。資料將從 id=1 開始擷取。
如果 curl 要求傳回狀態:200,表示建立成功,範例回應將會是
{"status":200,"body":{"connector.class":"...
如果您看到下列記錄,表示檔案來源連接器已成功啟動。
tail -100f ~/logs/rocketmqconnect/connect_runtime.log
啟動連接器 elasticsearchSourceConnector 並設定目標狀態為已成功啟動!!
啟動 Elasticsearch Sink 連接器
執行下列指令以啟動 ES sink 連接器。連接器將訂閱 RocketMQ 主題的資料並使用它。它將轉換每則訊息為文件資料並將其寫入目的地 ES。
curl -X POST -H "Content-Type: application/json" http://127.0.0.1:8082/connectors/elasticsearchSinkConnector -d '{
"connector.class":"org.apache.rocketmq.connect.elasticsearch.connector.ElasticsearchSinkConnector",
"elasticsearchHost":"localhost",
"elasticsearchPort":9201,
"max.tasks":2,
"connect.topicnames":"ConnectEsTopic",
"value.converter":"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter",
"key.converter":"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter"
}'
注意:啟動指令指定目的地 ES 的地址和埠,它對應到先前在 Docker 中啟動的 ES2。
如果 curl 要求傳回狀態:200,表示建立成功,範例回應將會是
{"status":200,"body":{"connector.class":"...
如果您看到下列記錄,表示檔案來源連接器已成功啟動
tail -100f ~/logs/rocketmqconnect/connect_runtime.log
啟動連接器 elasticsearchSinkConnector 並設定目標狀態為已成功啟動!!
若要檢查 sink 連接器是否已將資料寫入目的地 ES 索引
- 在瀏覽器中存取 Kibana2 主控台地址:https://127.0.0.1:5602
- 在 Kibana2 Dev Tools 頁面中,查詢索引中的資料。如果它與來源 ES1 中的資料相符,表示連接器正常執行。
GET /connect_es/_search
{
"size": 100
}